Extension point streamProcessor
In component org.nuxeo.runtime.stream.service
Contribution Descriptors
- Class: org.nuxeo.runtime.stream.StreamProcessorDescriptor
Existing Contributions
Contributions are presented in the same order as the registration order on this extension point. This order is displayed before the contribution name, in brackets.
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.runtime.stream.StreamMetricsProcessor" enabled="true" name="metrics"> <policy continueOnFailure="false" delay="1s" maxDelay="10s" maxRetries="5" name="default"/> <stream codec="avro" name="input/null" partitions="1"/> <computation concurrency="1" name="stream/metrics"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.security.RetentionExpiredAction" defaultConcurrency="1" defaultPartitions="1" name="retentionExpired"> <!-- continue on failure, because failure to expire retention doesn't give us an inconsistent state --> <policy continueOnFailure="true" delay="1s" maxDelay="60s" maxRetries="20" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.action.DeletionAction" defaultConcurrency="2" defaultPartitions="2" name="deletion"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.drive.action.FireGroupUpdatedEventAction" defaultConcurrency="2" defaultPartitions="4" name="driveFireGroupUpdatedEvent"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <!-- CSV exporter processor --> <streamProcessor class="org.nuxeo.ecm.platform.csv.export.action.CSVExportAction" defaultConcurrency="2" defaultPartitions="2" name="csvExport"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> <stream name="bulk/makeBlob"> <filter class="org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter" name="overflow"> <option name="storeName">default</option> <option name="prefix">csvoverflow</option> <option name="thresholdSize">990000</option> </filter> </stream> <option name="produceImmediate">false</option> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.platform.picture.recompute.RecomputeViewsAction" defaultConcurrency="2" defaultPartitions="6" name="recomputeViews"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.platform.video.action.RecomputeVideoConversionsAction" defaultConcurrency="2" defaultPartitions="6" name="recomputeVideoConversions"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.platform.audit.impl.StreamAuditWriter" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" enabled="true" name="auditWriter"> <policy batchCapacity="25" batchThreshold="500ms" continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.core.bulk.BulkServiceProcessor" defaultCodec="avro" defaultConcurrency="1" defaultExternal="true" defaultPartitions="1" name="bulkServiceProcessor" start="false"> <stream external="false" name="bulk/command"/> <stream external="false" name="bulk/status"/> <stream external="false" name="bulk/done"/> <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="0" name="bulk/scroller"/> <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="bulk/status"/> <computation concurrency="2" name="bulk/scroller"/> <computation concurrency="1" name="bulk/status"/> </streamProcessor> <streamProcessor class="org.nuxeo.ecm.core.bulk.introspection.StreamIntrospectionProcessor" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" name="streamIntrospection"/> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <!-- SetProperty processor --> <streamProcessor class="org.nuxeo.ecm.core.bulk.action.SetPropertiesAction" defaultConcurrency="2" defaultPartitions="2" name="setProperties"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <!-- SetSystemProperty processor --> <streamProcessor class="org.nuxeo.ecm.core.bulk.action.SetSystemPropertiesAction" defaultConcurrency="2" defaultPartitions="2" name="setSystemProperties"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <!-- RemoveProxy processor --> <streamProcessor class="org.nuxeo.ecm.core.bulk.action.RemoveProxyAction" defaultConcurrency="2" defaultPartitions="2" name="removeProxy"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> <!-- Trash processor --> <streamProcessor class="org.nuxeo.ecm.core.bulk.action.TrashAction" defaultConcurrency="1" defaultPartitions="1" name="trash"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.automation.core.operations.services.bulk.AutomationBulkAction" defaultConcurrency="2" defaultPartitions="2" name="automation"> <policy continueOnFailure="true" delay="1s" maxRetries="3" name="default"/> </streamProcessor> <streamProcessor class="org.nuxeo.ecm.automation.core.operations.services.bulk.AutomationBulkActionUi" defaultConcurrency="2" defaultPartitions="2" name="automationUi"> <policy continueOnFailure="true" delay="1s" maxRetries="3" name="default"/> <option name="failOnError">false</option> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.elasticsearch.bulk.IndexAction" defaultConcurrency="2" defaultPartitions="4" enabled="true" name="indexAction"> <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="default"/> <!-- fetch content and build indexing requests --> <computation concurrency="4" name="bulk/index"/> <stream name="bulk/index" partitions="12"/> <!-- submit requests to elastic --> <computation concurrency="2" name="bulk/bulkIndex"/> <stream name="bulk/bulkIndex" partitions="8"> <filter class="org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter" name="overflow"> <option name="storeName">default</option> <option name="prefix">index</option> <option name="thresholdSize">990000</option> </filter> </stream> <computation concurrency="1" name="bulk/indexCompletion"/> <!-- optimal size of the elasticsearch bulk request --> <option name="esBulkSizeBytes">5242880</option> <!-- max number of actions in the elasticsearch bulk request --> <option name="esBulkActions">1000</option> <!-- flush elasticsearch bulk request interval --> <option name="flushIntervalSeconds">5</option> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <!-- Update Read ACLs processor --> <streamProcessor class="org.nuxeo.ecm.core.storage.dbs.action.UpdateReadAclsAction" defaultConcurrency="1" defaultPartitions="1" name="updateReadAcls"> <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>
-
<extension point="streamProcessor" target="org.nuxeo.runtime.stream.service"> <streamProcessor class="org.nuxeo.ecm.platform.thumbnail.action.RecomputeThumbnailsAction" defaultConcurrency="2" defaultPartitions="6" name="recomputeThumbnails"> <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/> </streamProcessor> </extension>